1 module collie.codec.lengthbaseframe;
2 
3 import std.bitmanip;
4 import std.conv;
5 import kiss.logger;
6 import kiss.event;
7 
8 import collie.channel;
9 import collie.codec.exception;
10 import kiss.container.ByteBuffer;
11 
12 /// The Pack format
13 /// header: ubytes 4 "00 00 00 00" -> uint 
14 /// Compress Type: ubyte one "00"
15 /// the data is a data.
16 
17 class MsgLengthTooBig : CollieCodecException
18 {
19 	pure nothrow @nogc @safe this(string msg, string file = __FILE__, size_t line = __LINE__)
20 	{
21 		super(msg, file, line);
22 	}
23 }
24 
25 class LengthBasedFrame(bool littleEndian = false) : Handler!(const(ubyte[]),ubyte[],ubyte[],StreamWriteBuffer)
26 {
27 	this(uint max, ubyte compressType = 0x00)
28 	{
29 		_max = max;
30 		_compressType = compressType;
31 		//    clear();
32 	}
33 
34 	final override void read(Context ctx, const(ubyte[]) msg)
35 	{
36 
37 		void doFireRead()
38 		{
39 			if(_data.length > 0)
40 				_data = unCompress(_readComType,_data);
41 			ctx.fireRead(_data);
42 			_data = null;
43 			_pos = ReadPOS.Length_Begin;
44 		}
45 
46 		size_t len = msg.length;
47 		for(size_t i = 0; i < len; ++i)
48 		{
49 			const ubyte ch = msg[i];
50 			final switch(_pos)
51 			{
52 				case ReadPOS.Length_Begin:
53 					_lenByte[0] = ch;
54 					_pos = ReadPOS.Length_1;
55 					break;
56 				case ReadPOS.Length_1:
57 					_lenByte[1] = ch;
58 					_pos = ReadPOS.Length_2;
59 					break;
60 				case ReadPOS.Length_2:
61 					_lenByte[2] = ch;
62 					_pos = ReadPOS.Length_End;
63 					break;
64 				case ReadPOS.Length_End:
65 					_lenByte[3] = ch;
66 					_pos = ReadPOS.Compress_Type;
67 					break;
68 				case ReadPOS.Compress_Type:
69 					_readComType = ch;
70 					_pos = ReadPOS.Body;
71 					_readLen = 0;
72 					_msgLen = endianToNative!(littleEndian,uint)(_lenByte);
73 					if(_msgLen == 0) {
74 						doFireRead();
75 						continue;
76 					} else if(_msgLen > _max){
77 						throw new MsgLengthTooBig("the max is : " ~ to!string(_max) ~ " the length is :" ~ to!string(_msgLen));
78 					}
79 					_data = new ubyte[_msgLen];
80 					break;
81 				case ReadPOS.Body:
82 				{
83 					const size_t needLen = _msgLen - _readLen;
84 					const size_t canRead = len - i;
85 					logDebug();
86 					if(canRead >= needLen){
87 						auto tlen = i + needLen;
88 						_data[_readLen.._msgLen] = msg[i..tlen];
89 						i = tlen - 1;
90 						doFireRead();
91 					} else {
92 						auto tlen = _readLen + canRead;
93 						_data[_readLen..tlen] = msg[i..len];
94 						_readLen = cast(uint)tlen;
95 						return;
96 					}
97 				}
98 					break;
99 			}
100 		}
101 	}
102 
103 	final override void write(Context ctx, ubyte[] msg, TheCallBack cback = null)
104 	{
105 		logDebug("writeln data!");
106 		try 
107 		{
108 			ubyte ctype = _compressType;
109 			auto tmsg = doCompress(ctype, msg);
110 			uint size = cast(uint) tmsg.length;
111 			if(size > _max){
112 				throw new MsgLengthTooBig("the max is : " ~ to!string(_max) ~ " the length is :" ~ to!string(_msgLen));
113 			}
114 			ubyte[] data = new ubyte[size + 5];
115 			ubyte[4] length = nativeToEndian!(littleEndian,uint)(size); 
116 			data[0 .. 4] = length[];
117 			data[4] = ctype;
118 			data[5 .. $] = tmsg[];
119 			ctx.fireWrite(new SocketStreamBuffer(data,null),null);
120 			if (cback)
121 				cback(msg, size);
122 		}
123 		catch (Exception e)
124 		{
125 			import collie.utils.exception;
126 			showException(e);
127 			if (cback)
128 				cback(msg, 0);
129 		}
130 	}
131 
132 protected:
133 	ubyte[] doCompress(ref ubyte type, ubyte[] data)
134 	{
135 		return data;
136 	}
137 	
138 	ubyte[] unCompress(in ubyte type, ubyte[] data)
139 	{
140 		return data;
141 	}
142 
143 private:
144 	enum ReadPOS : ubyte
145 	{
146 		Length_Begin,
147 		Length_1,
148 		Length_2,
149 		Length_End,
150 		Compress_Type,
151 		Body
152 	}
153 
154 private:
155 	ubyte[] _data;
156 	ubyte[4] _lenByte;
157 	ubyte _readComType;
158 	uint _msgLen;
159 	uint _readLen;
160 	ReadPOS _pos = ReadPOS.Length_Begin;
161 
162 	uint _max;
163 	ubyte _compressType;
164 }
165 
166 
167 unittest
168 {
169 	import collie.net;
170 	import kiss.net.TcpStream;
171 	import collie.channel.handlercontext;
172 	import std.stdio;
173 	
174 	ubyte[] gloaData;
175 	
176 	class Contex : HandlerContext!(ubyte[],StreamWriteBuffer)
177 	{
178 		override void fireRead(ubyte[] msg)
179 		{
180 			writeln("the msg is : ", cast(string) msg);
181 		}
182 		
183 		override void fireTimeOut()
184 		{
185 		}
186 		
187 		override void fireTransportActive()
188 		{
189 		}
190 		
191 		override void fireTransportInactive()
192 		{
193 		}
194 		
195 		override void fireWrite(StreamWriteBuffer msg, void delegate(StreamWriteBuffer, size_t) cback = null)
196 		{
197 			auto data = msg.sendData;
198 			gloaData ~= data;
199 			writeln("length is : ", data[0 .. 4], " \n the data is : ", cast(string)(data[4 .. $]));
200 		}
201 		
202 		override void fireClose()
203 		{
204 		}
205 		
206 		override @property PipelineBase pipeline()
207 		{
208 			return null;
209 		}
210 		
211 		override @property Transport transport()
212 		{
213 			return null;
214 		}
215 	}
216 	
217 	Contex ctx = new Contex();
218 	
219 	auto hander = new LengthBasedFrame!false(2048);
220 	string data = "i am a test string";
221 	ubyte[] tdata = cast(ubyte[]) data;
222 	hander.write(ctx, tdata);
223 	
224 	hander.write(ctx, gloaData);
225 	
226 	hander.read(ctx, gloaData);
227 	
228 	hander.read(ctx, gloaData[0 .. 3]);
229 	hander.read(ctx, gloaData[3 .. 20]);
230 	hander.read(ctx, gloaData[20 .. $]);
231 	
232 }